/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.io.hfile;

import static java.util.Objects.requireNonNull;

import java.lang.ref.WeakReference;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
import org.apache.hbase.thirdparty.com.google.common.base.Objects;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * <b>This realisation improve performance of classical LRU
 * cache up to 3 times via reduce GC job.</b>
 * </p>
 * The classical block cache implementation that is memory-aware using {@link HeapSize},
 * memory-bound using an
 * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a
 * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock}
 * operations.
 * </p>
 * Contains three levels of block priority to allow for scan-resistance and in-memory families
 * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An
 * in-memory column family is a column family that should be served from memory if possible):
 * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory
 * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()},
 * otherwise a block becomes a single access priority the first time it is read into this block
 * cache. If a block is accessed again while in cache, it is marked as a multiple access priority
 * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a
 * least-frequently-used element to the eviction algorithm.
 * <p/>
 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each
 * priority will retain close to its maximum size, however, if any priority is not using its entire
 * chunk the others are able to grow beyond their chunk size.
 * <p/>
 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The
 * block size is not especially important as this cache is fully dynamic in its sizing of blocks. It
 * is only used for pre-allocating data structures and in initial heap estimation of the map.
 * <p/>
 * The detailed constructor defines the sizes for the three priorities (they should total to the
 * <code>maximum size</code> defined). It also sets the levels that trigger and control the eviction
 * thread.
 * <p/>
 * The <code>acceptable size</code> is the cache size level which triggers the eviction process to
 * start. It evicts enough blocks to get the size below the minimum size specified.
 * <p/>
 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines
 * how many bytes must be freed to reach the minimum size, and then while scanning determines the
 * fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times
 * bytes to free). It then uses the priority chunk sizes to evict fairly according to the relative
 * sizes and usage.
 * <p/>
 * Adaptive LRU cache lets speed up performance while we are reading much more data than can fit
 * into BlockCache and it is the cause of a high rate of evictions. This in turn leads to heavy
 * Garbage Collector works. So a lot of blocks put into BlockCache but never read, but spending
 * a lot of CPU resources for cleaning. We could avoid this situation via parameters:
 * <p/>
 * <b>hbase.lru.cache.heavy.eviction.count.limit</b>  - set how many times we have to run the
 * eviction process that starts to avoid putting data to BlockCache. By default it is 0 and it
 * meats the feature will start at the beginning. But if we have some times short reading the same
 * data and some times long-term reading - we can divide it by this parameter. For example we know
 * that our short reading used to be about 1 minutes, then we have to set the parameter about 10
 * and it will enable the feature only for long time massive reading (after ~100 seconds). So when
 * we use short-reading and want all of them in the cache we will have it (except for eviction of
 * course). When we use long-term heavy reading the feature will be enabled after some time and
 * bring better performance.
 * <p/>
 * <b>hbase.lru.cache.heavy.eviction.mb.size.limit</b> - set how many bytes in 10 seconds desirable
 * putting into BlockCache (and evicted from it). The feature will try to reach this value and
 * maintain it. Don't try to set it too small because it leads to premature exit from this mode.
 * For powerful CPUs (about 20-40 physical cores)  it could be about 400-500 MB. Average system
 * (~10 cores) 200-300 MB. Some weak systems (2-5 cores) may be good with 50-100 MB.
 * How it works: we set the limit and after each ~10 second calculate how many bytes were freed.
 * Overhead = Freed Bytes Sum (MB) * 100 / Limit (MB) - 100;
 * For example we set the limit = 500 and were evicted 2000 MB. Overhead is:
 * 2000 * 100 / 500 - 100 = 300%
 * The feature is going to reduce a percent caching data blocks and fit evicted bytes closer to
 * 100% (500 MB). Some kind of an auto-scaling.
 * If freed bytes less then the limit we have got negative overhead.
 * For example if were freed 200 MB:
 * 200 * 100 / 500 - 100 = -60%
 * The feature will increase the percent of caching blocks.
 * That leads to fit evicted bytes closer to 100% (500 MB).
 * The current situation we can find out in the log of RegionServer:
 * BlockCache evicted (MB): 0, overhead (%): -100, heavy eviction counter: 0, current caching
 * DataBlock (%): 100 - means no eviction, 100% blocks is caching
 * BlockCache evicted (MB): 2000, overhead (%): 300, heavy eviction counter: 1, current caching
 * DataBlock (%): 97 - means eviction begin, reduce of caching blocks by 3%.
 * It help to tune your system and find out what value is better set. Don't try to reach 0%
 * overhead, it is impossible. Quite good 50-100% overhead,
 * it prevents premature exit from this mode.
 * <p/>
 * <b>hbase.lru.cache.heavy.eviction.overhead.coefficient</b> - set how fast we want to get the
 * result. If we know that our reading is heavy for a long time, we don't want to wait and can
 * increase the coefficient and get good performance sooner. But if we aren't sure we can do it
 * slowly and it could prevent premature exit from this mode. So, when the coefficient is higher
 * we can get better performance when heavy reading is stable. But when reading is changing we
 * can adjust to it and set  the coefficient to lower value.
 * For example, we set the coefficient = 0.01. It means the overhead (see above) will be
 * multiplied by 0.01 and the result is the value of reducing percent caching blocks. For example,
 * if the overhead = 300% and the coefficient = 0.01,
 * then percent of caching blocks will reduce by 3%.
 * Similar logic when overhead has got negative value (overshooting).  Maybe it is just short-term
 * fluctuation and we will try to stay in this mode. It helps avoid premature exit during
 * short-term fluctuation. Backpressure has simple logic: more overshooting - more caching blocks.
 * <p/>
 * Find more information about improvement: https://issues.apache.org/jira/browse/HBASE-23887
 */
@InterfaceAudience.Private
public class LruAdaptiveBlockCache implements FirstLevelBlockCache {

  private static final Logger LOG = LoggerFactory.getLogger(LruAdaptiveBlockCache.class);

  /**
   * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
   * evicting during an eviction run till the cache size is down to 80% of the total.
   */
  private static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";

  /**
   * Acceptable size of cache (no evictions if size < acceptable)
   */
  private static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME =
    "hbase.lru.blockcache.acceptable.factor";

  /**
   * Hard capacity limit of cache, will reject any put if size > this * acceptable
   */
  static final String LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME =
    "hbase.lru.blockcache.hard.capacity.limit.factor";
  private static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME =
    "hbase.lru.blockcache.single.percentage";
  private static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME =
    "hbase.lru.blockcache.multi.percentage";
  private static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME =
    "hbase.lru.blockcache.memory.percentage";

  /**
   * Configuration key to force data-block always (except in-memory are too much)
   * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
   * configuration, inMemoryForceMode is a cluster-wide configuration
   */
  private static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME =
    "hbase.lru.rs.inmemoryforcemode";

  /* Default Configuration Parameters*/

  /* Backing Concurrent Map Configuration */
  static final float DEFAULT_LOAD_FACTOR = 0.75f;
  static final int DEFAULT_CONCURRENCY_LEVEL = 16;

  /* Eviction thresholds */
  private static final float DEFAULT_MIN_FACTOR = 0.95f;
  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;

  /* Priority buckets */
  private static final float DEFAULT_SINGLE_FACTOR = 0.25f;
  private static final float DEFAULT_MULTI_FACTOR = 0.50f;
  private static final float DEFAULT_MEMORY_FACTOR = 0.25f;

  private static final float DEFAULT_HARD_CAPACITY_LIMIT_FACTOR = 1.2f;

  private static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;

  /* Statistics thread */
  private static final int STAT_THREAD_PERIOD = 60 * 5;
  private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
  private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;

  private static final String LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT
    = "hbase.lru.cache.heavy.eviction.count.limit";
  // Default value actually equal to disable feature of increasing performance.
  // Because 2147483647 is about ~680 years (after that it will start to work)
  // We can set it to 0-10 and get the profit right now.
  // (see details https://issues.apache.org/jira/browse/HBASE-23887).
  private static final int DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT = Integer.MAX_VALUE;

  private static final String LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT
    = "hbase.lru.cache.heavy.eviction.mb.size.limit";
  private static final long DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT = 500;

  private static final String LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT
    = "hbase.lru.cache.heavy.eviction.overhead.coefficient";
  private static final float DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT = 0.01f;

  /**
   * Defined the cache map as {@link ConcurrentHashMap} here, because in
   * {@link LruAdaptiveBlockCache#getBlock}, we need to guarantee the atomicity
   * of map#computeIfPresent (key, func). Besides, the func method must execute exactly once only
   * when the key is present and under the lock context, otherwise the reference count will be
   * messed up. Notice that the
   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
   */
  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;

  /** Eviction lock (locked when eviction in process) */
  private transient final ReentrantLock evictionLock = new ReentrantLock(true);

  private final long maxBlockSize;

  /** Volatile boolean to track if we are in an eviction process or not */
  private volatile boolean evictionInProgress = false;

  /** Eviction thread */
  private transient final EvictionThread evictionThread;

  /** Statistics thread schedule pool (for heavy debugging, could remove) */
  private transient final ScheduledExecutorService scheduleThreadPool =
    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
      .setNameFormat("LruAdaptiveBlockCacheStatsExecutor").setDaemon(true).build());

  /** Current size of cache */
  private final AtomicLong size;

  /** Current size of data blocks */
  private final LongAdder dataBlockSize;

  /** Current number of cached elements */
  private final AtomicLong elements;

  /** Current number of cached data block elements */
  private final LongAdder dataBlockElements;

  /** Cache access count (sequential ID) */
  private final AtomicLong count;

  /** hard capacity limit */
  private final float hardCapacityLimitFactor;

  /** Cache statistics */
  private final CacheStats stats;

  /** Maximum allowable size of cache (block put if size > max, evict) */
  private long maxSize;

  /** Approximate block size */
  private final long blockSize;

  /** Acceptable size of cache (no evictions if size < acceptable) */
  private final float acceptableFactor;

  /** Minimum threshold of cache (when evicting, evict until size < min) */
  private final float minFactor;

  /** Single access bucket size */
  private final float singleFactor;

  /** Multiple access bucket size */
  private final float multiFactor;

  /** In-memory bucket size */
  private final float memoryFactor;

  /** Overhead of the structure itself */
  private final long overhead;

  /** Whether in-memory hfile's data block has higher priority when evicting */
  private boolean forceInMemory;

  /**
   * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
   * external cache as L2.
   * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
   */
  private transient BlockCache victimHandler = null;

  /** Percent of cached data blocks */
  private volatile int cacheDataBlockPercent;

  /** Limit of count eviction process when start to avoid to cache blocks */
  private final int heavyEvictionCountLimit;

  /** Limit of volume eviction process when start to avoid to cache blocks */
  private final long heavyEvictionMbSizeLimit;

  /** Adjust auto-scaling via overhead of evition rate */
  private final float heavyEvictionOverheadCoefficient;

  /**
   * Default constructor.  Specify maximum size and expected average block
   * size (approximation is fine).
   *
   * <p>All other factors will be calculated based on defaults specified in
   * this class.
   *
   * @param maxSize   maximum size of cache, in bytes
   * @param blockSize approximate size of each block, in bytes
   */
  public LruAdaptiveBlockCache(long maxSize, long blockSize) {
    this(maxSize, blockSize, true);
  }

  /**
   * Constructor used for testing.  Allows disabling of the eviction thread.
   */
  public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread) {
    this(maxSize, blockSize, evictionThread,
      (int) Math.ceil(1.2 * maxSize / blockSize),
      DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
      DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
      DEFAULT_SINGLE_FACTOR,
      DEFAULT_MULTI_FACTOR,
      DEFAULT_MEMORY_FACTOR,
      DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
      false,
      DEFAULT_MAX_BLOCK_SIZE,
      DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
      DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
      DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT);
  }

  public LruAdaptiveBlockCache(long maxSize, long blockSize,
    boolean evictionThread, Configuration conf) {
    this(maxSize, blockSize, evictionThread,
      (int) Math.ceil(1.2 * maxSize / blockSize),
      DEFAULT_LOAD_FACTOR,
      DEFAULT_CONCURRENCY_LEVEL,
      conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
      conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
      conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
      conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
      conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
      conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
        DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
      conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
      conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE),
      conf.getInt(LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
        DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT),
      conf.getLong(LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
        DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT),
      conf.getFloat(LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT,
        DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT));
  }

  public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) {
    this(maxSize, blockSize, true, conf);
  }

  /**
   * Configurable constructor.  Use this constructor if not using defaults.
   *
   * @param maxSize             maximum size of this cache, in bytes
   * @param blockSize           expected average size of blocks, in bytes
   * @param evictionThread      whether to run evictions in a bg thread or not
   * @param mapInitialSize      initial size of backing ConcurrentHashMap
   * @param mapLoadFactor       initial load factor of backing ConcurrentHashMap
   * @param mapConcurrencyLevel initial concurrency factor for backing CHM
   * @param minFactor           percentage of total size that eviction will evict until
   * @param acceptableFactor    percentage of total size that triggers eviction
   * @param singleFactor        percentage of total size for single-access blocks
   * @param multiFactor         percentage of total size for multiple-access blocks
   * @param memoryFactor        percentage of total size for in-memory blocks
   * @param hardLimitFactor     hard capacity limit
   * @param forceInMemory       in-memory hfile's data block has higher priority when evicting
   * @param maxBlockSize        maximum block size for caching
   * @param heavyEvictionCountLimit   when starts AdaptiveLRU algoritm work
   * @param heavyEvictionMbSizeLimit  how many bytes desirable putting into BlockCache
   * @param heavyEvictionOverheadCoefficient  how aggressive AdaptiveLRU will reduce GC
   */
  public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread,
    int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
    float minFactor, float acceptableFactor, float singleFactor,
    float multiFactor, float memoryFactor, float hardLimitFactor,
    boolean forceInMemory, long maxBlockSize,
    int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit,
    float heavyEvictionOverheadCoefficient) {
    this.maxBlockSize = maxBlockSize;
    if(singleFactor + multiFactor + memoryFactor != 1 ||
      singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
      throw new IllegalArgumentException("Single, multi, and memory factors " +
        " should be non-negative and total 1.0");
    }
    if (minFactor >= acceptableFactor) {
      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
    }
    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
      throw new IllegalArgumentException("all factors must be < 1");
    }
    this.maxSize = maxSize;
    this.blockSize = blockSize;
    this.forceInMemory = forceInMemory;
    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
    this.minFactor = minFactor;
    this.acceptableFactor = acceptableFactor;
    this.singleFactor = singleFactor;
    this.multiFactor = multiFactor;
    this.memoryFactor = memoryFactor;
    this.stats = new CacheStats(this.getClass().getSimpleName());
    this.count = new AtomicLong(0);
    this.elements = new AtomicLong(0);
    this.dataBlockElements = new LongAdder();
    this.dataBlockSize = new LongAdder();
    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
    this.size = new AtomicLong(this.overhead);
    this.hardCapacityLimitFactor = hardLimitFactor;
    if (evictionThread) {
      this.evictionThread = new EvictionThread(this);
      this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
    } else {
      this.evictionThread = null;
    }

    // check the bounds
    this.heavyEvictionCountLimit = Math.max(heavyEvictionCountLimit, 0);
    this.heavyEvictionMbSizeLimit = Math.max(heavyEvictionMbSizeLimit, 1);
    this.cacheDataBlockPercent = 100;
    heavyEvictionOverheadCoefficient = Math.min(heavyEvictionOverheadCoefficient, 1.0f);
    heavyEvictionOverheadCoefficient = Math.max(heavyEvictionOverheadCoefficient, 0.001f);
    this.heavyEvictionOverheadCoefficient = heavyEvictionOverheadCoefficient;

    // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
    // every five minutes.
    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
      STAT_THREAD_PERIOD, TimeUnit.SECONDS);
  }

  @Override
  public void setVictimCache(BlockCache victimCache) {
    if (victimHandler != null) {
      throw new IllegalArgumentException("The victim cache has already been set");
    }
    victimHandler = requireNonNull(victimCache);
  }

  @Override
  public void setMaxSize(long maxSize) {
    this.maxSize = maxSize;
    if (this.size.get() > acceptableSize() && !evictionInProgress) {
      runEviction();
    }
  }

  public int getCacheDataBlockPercent() {
    return cacheDataBlockPercent;
  }

  /**
   * The block cached in LruAdaptiveBlockCache will always be an heap block: on the one side,
   * the heap access will be more faster then off-heap, the small index block or meta block
   * cached in CombinedBlockCache will benefit a lot. on other side, the LruAdaptiveBlockCache
   * size is always calculated based on the total heap size, if caching an off-heap block in
   * LruAdaptiveBlockCache, the heap size will be messed up. Here we will clone the block into an
   * heap block if it's an off-heap block, otherwise just use the original block. The key point is
   * maintain the refCnt of the block (HBASE-22127): <br>
   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
   * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
   * reservoir, if both RPC and LruAdaptiveBlockCache release the block, then it can be garbage
   * collected by JVM, so need a retain here.
   * @param buf the original block
   * @return an block with an heap memory backend.
   */
  private Cacheable asReferencedHeapBlock(Cacheable buf) {
    if (buf instanceof HFileBlock) {
      HFileBlock blk = ((HFileBlock) buf);
      if (blk.isSharedMem()) {
        return HFileBlock.deepCloneOnHeap(blk);
      }
    }
    // The block will be referenced by this LruAdaptiveBlockCache,
    // so should increase its refCnt here.
    return buf.retain();
  }

  // BlockCache implementation

  /**
   * Cache the block with the specified name and buffer.
   * <p>
   * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
   * this can happen, for which we compare the buffer contents.
   *
   * @param cacheKey block's cache key
   * @param buf      block buffer
   * @param inMemory if block is in-memory
   */
  @Override
  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {

    // Some data blocks will not put into BlockCache when eviction rate too much.
    // It is good for performance
    // (see details: https://issues.apache.org/jira/browse/HBASE-23887)
    // How to calculate it can find inside EvictionThread class.
    if (cacheDataBlockPercent != 100 && buf.getBlockType().isData()) {
      // It works like filter - blocks which two last digits of offset
      // more than we calculate in Eviction Thread will not put into BlockCache
      if (cacheKey.getOffset() % 100 >= cacheDataBlockPercent) {
        return;
      }
    }

    if (buf.heapSize() > maxBlockSize) {
      // If there are a lot of blocks that are too
      // big this can make the logs way too noisy.
      // So we log 2%
      if (stats.failInsert() % 50 == 0) {
        LOG.warn("Trying to cache too large a block "
          + cacheKey.getHfileName() + " @ "
          + cacheKey.getOffset()
          + " is " + buf.heapSize()
          + " which is larger than " + maxBlockSize);
      }
      return;
    }

    LruCachedBlock cb = map.get(cacheKey);
    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this,
      cacheKey, buf)) {
      return;
    }
    long currentSize = size.get();
    long currentAcceptableSize = acceptableSize();
    long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
    if (currentSize >= hardLimitSize) {
      stats.failInsert();
      if (LOG.isTraceEnabled()) {
        LOG.trace("LruAdaptiveBlockCache current size " + StringUtils.byteDesc(currentSize)
          + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
          + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
          + ", failed to put cacheKey:" + cacheKey + " into LruAdaptiveBlockCache.");
      }
      if (!evictionInProgress) {
        runEviction();
      }
      return;
    }
    // Ensure that the block is an heap one.
    buf = asReferencedHeapBlock(buf);
    cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
    long newSize = updateSizeMetrics(cb, false);
    map.put(cacheKey, cb);
    long val = elements.incrementAndGet();
    if (buf.getBlockType().isData()) {
      dataBlockElements.increment();
    }
    if (LOG.isTraceEnabled()) {
      long size = map.size();
      assertCounterSanity(size, val);
    }
    if (newSize > currentAcceptableSize && !evictionInProgress) {
      runEviction();
    }
  }

  /**
   * Sanity-checking for parity between actual block cache content and metrics.
   * Intended only for use with TRACE level logging and -ea JVM.
   */
  private static void assertCounterSanity(long mapSize, long counterVal) {
    if (counterVal < 0) {
      LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
        ", mapSize=" + mapSize);
      return;
    }
    if (mapSize < Integer.MAX_VALUE) {
      double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
      if (pct_diff > 0.05) {
        LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
          ", mapSize=" + mapSize);
      }
    }
  }

  /**
   * Cache the block with the specified name and buffer.
   * <p>
   * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
   * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
   * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
   * otherwise the caching size is based on off-heap.
   * @param cacheKey block's cache key
   * @param buf block buffer
   */
  @Override
  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
    cacheBlock(cacheKey, buf, false);
  }

  /**
   * Helper function that updates the local size counter and also updates any
   * per-cf or per-blocktype metrics it can discern from given
   * {@link LruCachedBlock}
   */
  private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
    long heapsize = cb.heapSize();
    BlockType bt = cb.getBuffer().getBlockType();
    if (evict) {
      heapsize *= -1;
    }
    if (bt != null && bt.isData()) {
      dataBlockSize.add(heapsize);
    }
    return size.addAndGet(heapsize);
  }

  /**
   * Get the buffer of the block with the specified name.
   *
   * @param cacheKey           block's cache key
   * @param caching            true if the caller caches blocks on cache misses
   * @param repeat             Whether this is a repeat lookup for the same block
   *                           (used to avoid double counting cache misses when doing double-check
   *                           locking)
   * @param updateCacheMetrics Whether to update cache metrics or not
   *
   * @return buffer of specified cache key, or null if not in cache
   */
  @Override
  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
    boolean updateCacheMetrics) {
    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
      // see HBASE-22422.
      val.getBuffer().retain();
      return val;
    });
    if (cb == null) {
      if (!repeat && updateCacheMetrics) {
        stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
      }
      // If there is another block cache then try and read there.
      // However if this is a retry ( second time in double checked locking )
      // And it's already a miss then the l2 will also be a miss.
      if (victimHandler != null && !repeat) {
        // The handler will increase result's refCnt for RPC, so need no extra retain.
        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
        // Promote this to L1.
        if (result != null) {
          if (caching) {
            cacheBlock(cacheKey, result, /* inMemory = */ false);
          }
        }
        return result;
      }
      return null;
    }
    if (updateCacheMetrics) {
      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
    }
    cb.access(count.incrementAndGet());
    return cb.getBuffer();
  }

  /**
   * Whether the cache contains block with specified cacheKey
   *
   * @return true if contains the block
   */
  @Override
  public boolean containsBlock(BlockCacheKey cacheKey) {
    return map.containsKey(cacheKey);
  }

  @Override
  public boolean evictBlock(BlockCacheKey cacheKey) {
    LruCachedBlock cb = map.get(cacheKey);
    return cb != null && evictBlock(cb, false) > 0;
  }

  /**
   * Evicts all blocks for a specific HFile. This is an
   * expensive operation implemented as a linear-time search through all blocks
   * in the cache. Ideally this should be a search in a log-access-time map.
   *
   * <p>
   * This is used for evict-on-close to remove all blocks of a specific HFile.
   *
   * @return the number of blocks evicted
   */
  @Override
  public int evictBlocksByHfileName(String hfileName) {
    int numEvicted = (int) map.keySet().stream().filter(key -> key.getHfileName().equals(hfileName))
      .filter(this::evictBlock).count();
    if (victimHandler != null) {
      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
    }
    return numEvicted;
  }

  /**
   * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
   * block may be read again later
   *
   * @param evictedByEvictionProcess true if the given block is evicted by
   *          EvictionThread
   * @return the heap size of evicted block
   */
  protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
    LruCachedBlock previous = map.remove(block.getCacheKey());
    if (previous == null) {
      return 0;
    }
    updateSizeMetrics(block, true);
    long val = elements.decrementAndGet();
    if (LOG.isTraceEnabled()) {
      long size = map.size();
      assertCounterSanity(size, val);
    }
    if (block.getBuffer().getBlockType().isData()) {
      dataBlockElements.decrement();
    }
    if (evictedByEvictionProcess) {
      // When the eviction of the block happened because of invalidation of HFiles, no need to
      // update the stats counter.
      stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
      if (victimHandler != null) {
        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
      }
    }
    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
    // NOT move this up because if do that then the victimHandler may access the buffer with
    // refCnt = 0 which is disallowed.
    previous.getBuffer().release();
    return block.heapSize();
  }

  /**
   * Multi-threaded call to run the eviction process.
   */
  private void runEviction() {
    if (evictionThread == null) {
      evict();
    } else {
      evictionThread.evict();
    }
  }

  boolean isEvictionInProgress() {
    return evictionInProgress;
  }

  long getOverhead() {
    return overhead;
  }

  /**
   * Eviction method.
   *
   * Evict items in order of use, allowing delete items
   * which haven't been used for the longest amount of time.
   *
   * @return how many bytes were freed
   */
  long evict() {

    // Ensure only one eviction at a time
    if (!evictionLock.tryLock()) {
      return 0;
    }

    long bytesToFree = 0L;

    try {
      evictionInProgress = true;
      long currentSize = this.size.get();
      bytesToFree = currentSize - minSize();

      if (LOG.isTraceEnabled()) {
        LOG.trace("Block cache LRU eviction started; Attempting to free " +
          StringUtils.byteDesc(bytesToFree) + " of total=" +
          StringUtils.byteDesc(currentSize));
      }

      if (bytesToFree <= 0) {
        return 0;
      }

      // Instantiate priority buckets
      BlockBucket bucketSingle
        = new BlockBucket("single", bytesToFree, blockSize, singleSize());
      BlockBucket bucketMulti
        = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
      BlockBucket bucketMemory
        = new BlockBucket("memory", bytesToFree, blockSize, memorySize());

      // Scan entire map putting into appropriate buckets
      for (LruCachedBlock cachedBlock : map.values()) {
        switch (cachedBlock.getPriority()) {
          case SINGLE: {
            bucketSingle.add(cachedBlock);
            break;
          }
          case MULTI: {
            bucketMulti.add(cachedBlock);
            break;
          }
          case MEMORY: {
            bucketMemory.add(cachedBlock);
            break;
          }
        }
      }

      long bytesFreed = 0;
      if (forceInMemory || memoryFactor > 0.999f) {
        long s = bucketSingle.totalSize();
        long m = bucketMulti.totalSize();
        if (bytesToFree > (s + m)) {
          // this means we need to evict blocks in memory bucket to make room,
          // so the single and multi buckets will be emptied
          bytesFreed = bucketSingle.free(s);
          bytesFreed += bucketMulti.free(m);
          if (LOG.isTraceEnabled()) {
            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
              " from single and multi buckets");
          }
          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
          if (LOG.isTraceEnabled()) {
            LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
              " total from all three buckets ");
          }
        } else {
          // this means no need to evict block in memory bucket,
          // and we try best to make the ratio between single-bucket and
          // multi-bucket is 1:2
          long bytesRemain = s + m - bytesToFree;
          if (3 * s <= bytesRemain) {
            // single-bucket is small enough that no eviction happens for it
            // hence all eviction goes from multi-bucket
            bytesFreed = bucketMulti.free(bytesToFree);
          } else if (3 * m <= 2 * bytesRemain) {
            // multi-bucket is small enough that no eviction happens for it
            // hence all eviction goes from single-bucket
            bytesFreed = bucketSingle.free(bytesToFree);
          } else {
            // both buckets need to evict some blocks
            bytesFreed = bucketSingle.free(s - bytesRemain / 3);
            if (bytesFreed < bytesToFree) {
              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
            }
          }
        }
      } else {
        PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);

        bucketQueue.add(bucketSingle);
        bucketQueue.add(bucketMulti);
        bucketQueue.add(bucketMemory);

        int remainingBuckets = bucketQueue.size();

        BlockBucket bucket;
        while ((bucket = bucketQueue.poll()) != null) {
          long overflow = bucket.overflow();
          if (overflow > 0) {
            long bucketBytesToFree =
              Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
            bytesFreed += bucket.free(bucketBytesToFree);
          }
          remainingBuckets--;
        }
      }
      if (LOG.isTraceEnabled()) {
        long single = bucketSingle.totalSize();
        long multi = bucketMulti.totalSize();
        long memory = bucketMemory.totalSize();
        LOG.trace("Block cache LRU eviction completed; " +
          "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
          "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
          "single=" + StringUtils.byteDesc(single) + ", " +
          "multi=" + StringUtils.byteDesc(multi) + ", " +
          "memory=" + StringUtils.byteDesc(memory));
      }
    } finally {
      stats.evict();
      evictionInProgress = false;
      evictionLock.unlock();
    }
    return bytesToFree;
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this)
      .add("blockCount", getBlockCount())
      .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
      .add("freeSize", StringUtils.byteDesc(getFreeSize()))
      .add("maxSize", StringUtils.byteDesc(getMaxSize()))
      .add("heapSize", StringUtils.byteDesc(heapSize()))
      .add("minSize", StringUtils.byteDesc(minSize()))
      .add("minFactor", minFactor)
      .add("multiSize", StringUtils.byteDesc(multiSize()))
      .add("multiFactor", multiFactor)
      .add("singleSize", StringUtils.byteDesc(singleSize()))
      .add("singleFactor", singleFactor)
      .toString();
  }

  /**
   * Used to group blocks into priority buckets.  There will be a BlockBucket
   * for each priority (single, multi, memory).  Once bucketed, the eviction
   * algorithm takes the appropriate number of elements out of each according
   * to configuration parameters and their relatives sizes.
   */
  private class BlockBucket implements Comparable<BlockBucket> {

    private final String name;
    private final LruCachedBlockQueue queue;
    private long totalSize = 0;
    private final long bucketSize;

    public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
      this.name = name;
      this.bucketSize = bucketSize;
      queue = new LruCachedBlockQueue(bytesToFree, blockSize);
      totalSize = 0;
    }

    public void add(LruCachedBlock block) {
      totalSize += block.heapSize();
      queue.add(block);
    }

    public long free(long toFree) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this);
      }
      LruCachedBlock cb;
      long freedBytes = 0;
      while ((cb = queue.pollLast()) != null) {
        freedBytes += evictBlock(cb, true);
        if (freedBytes >= toFree) {
          return freedBytes;
        }
      }
      if (LOG.isTraceEnabled()) {
        LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this);
      }
      return freedBytes;
    }

    public long overflow() {
      return totalSize - bucketSize;
    }

    public long totalSize() {
      return totalSize;
    }

    @Override
    public int compareTo(BlockBucket that) {
      return Long.compare(this.overflow(), that.overflow());
    }

    @Override
    public boolean equals(Object that) {
      if (!(that instanceof BlockBucket)) {
        return false;
      }
      return compareTo((BlockBucket)that) == 0;
    }

    @Override
    public int hashCode() {
      return Objects.hashCode(name, bucketSize, queue, totalSize);
    }

    @Override
    public String toString() {
      return MoreObjects.toStringHelper(this)
        .add("name", name)
        .add("totalSize", StringUtils.byteDesc(totalSize))
        .add("bucketSize", StringUtils.byteDesc(bucketSize))
        .toString();
    }
  }

  /**
   * Get the maximum size of this cache.
   *
   * @return max size in bytes
   */

  @Override
  public long getMaxSize() {
    return this.maxSize;
  }

  @Override
  public long getCurrentSize() {
    return this.size.get();
  }

  @Override
  public long getCurrentDataSize() {
    return this.dataBlockSize.sum();
  }

  @Override
  public long getFreeSize() {
    return getMaxSize() - getCurrentSize();
  }

  @Override
  public long size() {
    return getMaxSize();
  }

  @Override
  public long getBlockCount() {
    return this.elements.get();
  }

  @Override
  public long getDataBlockCount() {
    return this.dataBlockElements.sum();
  }

  EvictionThread getEvictionThread() {
    return this.evictionThread;
  }

  /*
   * Eviction thread.  Sits in waiting state until an eviction is triggered
   * when the cache size grows above the acceptable level.<p>
   *
   * Thread is triggered into action by {@link LruAdaptiveBlockCache#runEviction()}
   */
  static class EvictionThread extends Thread {

    private WeakReference<LruAdaptiveBlockCache> cache;
    private volatile boolean go = true;
    // flag set after enter the run method, used for test
    private boolean enteringRun = false;

    public EvictionThread(LruAdaptiveBlockCache cache) {
      super(Thread.currentThread().getName() + ".LruAdaptiveBlockCache.EvictionThread");
      setDaemon(true);
      this.cache = new WeakReference<>(cache);
    }

    @Override
    public void run() {
      enteringRun = true;
      long freedSumMb = 0;
      int heavyEvictionCount = 0;
      int freedDataOverheadPercent = 0;
      long startTime = EnvironmentEdgeManager.currentTime();
      while (this.go) {
        synchronized (this) {
          try {
            this.wait(1000 * 10/*Don't wait for ever*/);
          } catch (InterruptedException e) {
            LOG.warn("Interrupted eviction thread ", e);
            Thread.currentThread().interrupt();
          }
        }
        LruAdaptiveBlockCache cache = this.cache.get();
        if (cache == null) {
          break;
        }
        freedSumMb += cache.evict()/1024/1024;
        /*
         * Sometimes we are reading more data than can fit into BlockCache
         * and it is the cause a high rate of evictions.
         * This in turn leads to heavy Garbage Collector works.
         * So a lot of blocks put into BlockCache but never read,
         * but spending a lot of CPU resources.
         * Here we will analyze how many bytes were freed and decide
         * decide whether the time has come to reduce amount of caching blocks.
         * It help avoid put too many blocks into BlockCache
         * when evict() works very active and save CPU for other jobs.
         * More delails: https://issues.apache.org/jira/browse/HBASE-23887
         */

        // First of all we have to control how much time
        // has passed since previuos evict() was launched
        // This is should be almost the same time (+/- 10s)
        // because we get comparable volumes of freed bytes each time.
        // 10s because this is default period to run evict() (see above this.wait)
        long stopTime = EnvironmentEdgeManager.currentTime();
        if ((stopTime - startTime) > 1000 * 10 - 1) {
          // Here we have to calc what situation we have got.
          // We have the limit "hbase.lru.cache.heavy.eviction.bytes.size.limit"
          // and can calculte overhead on it.
          // We will use this information to decide,
          // how to change percent of caching blocks.
          freedDataOverheadPercent =
            (int) (freedSumMb * 100 / cache.heavyEvictionMbSizeLimit) - 100;
          if (freedSumMb > cache.heavyEvictionMbSizeLimit) {
            // Now we are in the situation when we are above the limit
            // But maybe we are going to ignore it because it will end quite soon
            heavyEvictionCount++;
            if (heavyEvictionCount > cache.heavyEvictionCountLimit) {
              // It is going for a long time and we have to reduce of caching
              // blocks now. So we calculate here how many blocks we want to skip.
              // It depends on:
              // 1. Overhead - if overhead is big we could more aggressive
              // reducing amount of caching blocks.
              // 2. How fast we want to get the result. If we know that our
              // heavy reading for a long time, we don't want to wait and can
              // increase the coefficient and get good performance quite soon.
              // But if we don't sure we can do it slowly and it could prevent
              // premature exit from this mode. So, when the coefficient is
              // higher we can get better performance when heavy reading is stable.
              // But when reading is changing we can adjust to it and set
              // the coefficient to lower value.
              int change =
                (int) (freedDataOverheadPercent * cache.heavyEvictionOverheadCoefficient);
              // But practice shows that 15% of reducing is quite enough.
              // We are not greedy (it could lead to premature exit).
              change = Math.min(15, change);
              change = Math.max(0, change); // I think it will never happen but check for sure
              // So this is the key point, here we are reducing % of caching blocks
              cache.cacheDataBlockPercent -= change;
              // If we go down too deep we have to stop here, 1% any way should be.
              cache.cacheDataBlockPercent = Math.max(1, cache.cacheDataBlockPercent);
            }
          } else {
            // Well, we have got overshooting.
            // Mayby it is just short-term fluctuation and we can stay in this mode.
            // It help avoid permature exit during short-term fluctuation.
            // If overshooting less than 90%, we will try to increase the percent of
            // caching blocks and hope it is enough.
            if (freedSumMb >= cache.heavyEvictionMbSizeLimit * 0.1) {
              // Simple logic: more overshooting - more caching blocks (backpressure)
              int change = (int) (-freedDataOverheadPercent * 0.1 + 1);
              cache.cacheDataBlockPercent += change;
              // But it can't be more then 100%, so check it.
              cache.cacheDataBlockPercent = Math.min(100, cache.cacheDataBlockPercent);
            } else {
              // Looks like heavy reading is over.
              // Just exit form this mode.
              heavyEvictionCount = 0;
              cache.cacheDataBlockPercent = 100;
            }
          }
          LOG.info("BlockCache evicted (MB): {}, overhead (%): {}, " +
              "heavy eviction counter: {}, " +
              "current caching DataBlock (%): {}",
            freedSumMb, freedDataOverheadPercent,
            heavyEvictionCount, cache.cacheDataBlockPercent);

          freedSumMb = 0;
          startTime = stopTime;
        }
      }
    }

    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
      justification="This is what we want")
    public void evict() {
      synchronized (this) {
        this.notifyAll();
      }
    }

    synchronized void shutdown() {
      this.go = false;
      this.notifyAll();
    }

    /**
     * Used for the test.
     */
    boolean isEnteringRun() {
      return this.enteringRun;
    }
  }

  /*
   * Statistics thread.  Periodically prints the cache statistics to the log.
   */
  static class StatisticsThread extends Thread {

    private final LruAdaptiveBlockCache lru;

    public StatisticsThread(LruAdaptiveBlockCache lru) {
      super("LruAdaptiveBlockCacheStats");
      setDaemon(true);
      this.lru = lru;
    }

    @Override
    public void run() {
      lru.logStats();
    }
  }

  public void logStats() {
    // Log size
    long totalSize = heapSize();
    long freeSize = maxSize - totalSize;
    LruAdaptiveBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
      "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
      "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
      "blockCount=" + getBlockCount() + ", " +
      "accesses=" + stats.getRequestCount() + ", " +
      "hits=" + stats.getHitCount() + ", " +
      "hitRatio=" + (stats.getHitCount() == 0 ?
      "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
      "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
      "cachingHits=" + stats.getHitCachingCount() + ", " +
      "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
      "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
      "evictions=" + stats.getEvictionCount() + ", " +
      "evicted=" + stats.getEvictedCount() + ", " +
      "evictedPerRun=" + stats.evictedPerEviction());
  }

  /**
   * Get counter statistics for this cache.
   *
   * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
   * of the eviction processes.
   */
  @Override
  public CacheStats getStats() {
    return this.stats;
  }

  public final static long CACHE_FIXED_OVERHEAD =
    ClassSize.estimateBase(LruAdaptiveBlockCache.class, false);

  @Override
  public long heapSize() {
    return getCurrentSize();
  }

  private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
    // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
      + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
      + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
  }

  @Override
  public Iterator<CachedBlock> iterator() {
    final Iterator<LruCachedBlock> iterator = map.values().iterator();

    return new Iterator<CachedBlock>() {
      private final long now = System.nanoTime();

      @Override
      public boolean hasNext() {
        return iterator.hasNext();
      }

      @Override
      public CachedBlock next() {
        final LruCachedBlock b = iterator.next();
        return new CachedBlock() {
          @Override
          public String toString() {
            return BlockCacheUtil.toString(this, now);
          }

          @Override
          public BlockPriority getBlockPriority() {
            return b.getPriority();
          }

          @Override
          public BlockType getBlockType() {
            return b.getBuffer().getBlockType();
          }

          @Override
          public long getOffset() {
            return b.getCacheKey().getOffset();
          }

          @Override
          public long getSize() {
            return b.getBuffer().heapSize();
          }

          @Override
          public long getCachedTime() {
            return b.getCachedTime();
          }

          @Override
          public String getFilename() {
            return b.getCacheKey().getHfileName();
          }

          @Override
          public int compareTo(CachedBlock other) {
            int diff = this.getFilename().compareTo(other.getFilename());
            if (diff != 0) {
              return diff;
            }
            diff = Long.compare(this.getOffset(), other.getOffset());
            if (diff != 0) {
              return diff;
            }
            if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
              throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
            }
            return Long.compare(other.getCachedTime(), this.getCachedTime());
          }

          @Override
          public int hashCode() {
            return b.hashCode();
          }

          @Override
          public boolean equals(Object obj) {
            if (obj instanceof CachedBlock) {
              CachedBlock cb = (CachedBlock)obj;
              return compareTo(cb) == 0;
            } else {
              return false;
            }
          }
        };
      }

      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
    };
  }

  // Simple calculators of sizes given factors and maxSize

  long acceptableSize() {
    return (long)Math.floor(this.maxSize * this.acceptableFactor);
  }
  private long minSize() {
    return (long)Math.floor(this.maxSize * this.minFactor);
  }
  private long singleSize() {
    return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
  }
  private long multiSize() {
    return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
  }
  private long memorySize() {
    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
  }

  @Override
  public void shutdown() {
    if (victimHandler != null) {
      victimHandler.shutdown();
    }
    this.scheduleThreadPool.shutdown();
    for (int i = 0; i < 10; i++) {
      if (!this.scheduleThreadPool.isShutdown()) {
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          LOG.warn("Interrupted while sleeping");
          Thread.currentThread().interrupt();
          break;
        }
      }
    }

    if (!this.scheduleThreadPool.isShutdown()) {
      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
      LOG.debug("Still running " + runnables);
    }
    this.evictionThread.shutdown();
  }

  /** Clears the cache. Used in tests. */
  public void clearCache() {
    this.map.clear();
    this.elements.set(0);
  }

  public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
    Map<DataBlockEncoding, Integer> counts = new EnumMap<>(DataBlockEncoding.class);
    for (LruCachedBlock block : map.values()) {
      DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
      Integer count = counts.get(encoding);
      counts.put(encoding, (count == null ? 0 : count) + 1);
    }
    return counts;
  }

  Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
    return map;
  }

  @Override
  public BlockCache[] getBlockCaches() {
    if (victimHandler != null) {
      return new BlockCache[] { this, this.victimHandler };
    }
    return null;
  }
}
